package com.process;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.base.MQBaseETL;
import com.bean.CommentsEntity;
import com.bean.CommentsWideEntity;
import com.bean.DimGoodsDBEntity;
import com.bean.DimShopsDBEntity;
import com.utils.*;
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import redis.clients.jedis.Jedis;

import java.util.Date;

/**
 * @Description: 评论数据的实时ETL处理
 * @Author: Sky
 * @Times : 2021/8/14 10:24
 */
public class CommentsDataETL {

    public static void main(String[] args) throws Exception {

        CommentsDataETL commentsDataETL = new CommentsDataETL();

        commentsDataETL.process();

    }

    public void process() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        MQBaseETL baseEtl = new MQBaseETL();
        DataStream<String> commentDs = baseEtl.KafkaConsummer(ConfigReader.input_topic_comments, env);
        //使用JSON将String类型转化为字符串。
        DataStream<CommentsEntity> commentsBeanDS = commentDs.map(new RichMapFunction<String, CommentsEntity>() {
            @Override
            public CommentsEntity map(String s) throws Exception {
                return JSON.parseObject(s, CommentsEntity.class);
            }
        });

        //将品论表拉宽
        DataStream<CommentsWideEntity> commentsJsonDataStream = commentsBeanDS.map(new RichMapFunction<CommentsEntity, CommentsWideEntity>() {
            private Jedis jedis;

            @Override
            public void open(Configuration parameters) throws Exception {
                jedis = RedisUtil.getJedis().getResource();
                jedis.select(1);
            }

            @Override
            public void close() throws Exception {
                if (jedis != null && jedis.isConnected()) {
                    jedis.close();
                }
            }

            @Override
            public CommentsWideEntity map(CommentsEntity comments) throws Exception {
                System.out.println("goodsJson start..." + comments.getGoodsId());
                String goodsJSON = jedis.hget("foo_shop:dim_goods", comments.getGoodsId());
                System.out.println("goodsJSON" + goodsJSON);
                DimGoodsDBEntity goodInfo = DimGoodsDBEntity.getGoodInfo(goodsJSON);
                //将时间戳转化为时间类型
                TimeStamp timeStamp = new TimeStamp(comments.getTimestamp());
                Date date = new Date(timeStamp.getTime());
                //拉宽后的品论表
                CommentsWideEntity commentsWideEntity = new CommentsWideEntity();
                commentsWideEntity.setUserId(comments.getUserId());
                commentsWideEntity.setUserName(comments.getUserName());
                commentsWideEntity.setOrderGoodsId(comments.getOrderGoodsId());
                commentsWideEntity.setStarScore(comments.getStarScore());
                commentsWideEntity.setComments(comments.getComments());
                commentsWideEntity.setAssetsViedoJSON(comments.getAssetsViedoJSON());
                commentsWideEntity.setCreateTime(DateUtil.DateToString(date, DateStyle.YYYY_MM_DD_HH_MM_SS));

//                commentsWideEntity.setGoodsId(comments.getGoodsId());
//                commentsWideEntity.setGoodsName("没有姓名");
//                commentsWideEntity.setShopId(1024l);
                //通过redis查看商品表
                commentsWideEntity.setGoodsName(goodInfo.getGoodsName());
                commentsWideEntity.setShopId(goodInfo.getShopId());
                //通过商品id查询redis中的店铺信息
                String shopDat = jedis.hget("foo_shop:dim_shops",goodInfo.getShopId().toString());
                DimShopsDBEntity shopsDBEntity = DimShopsDBEntity.getDimShops(shopDat);
                if (shopsDBEntity.getShopId().longValue() == goodInfo.getShopId().longValue()){
                    commentsWideEntity.setShopName(shopsDBEntity.getShopName());
                }

                return commentsWideEntity;
            }
        });

        commentsJsonDataStream.print();

        //将DataStream转化为字符串，存入到Kafka
        SingleOutputStreamOperator<String> commentsWideDataStream = commentsJsonDataStream.map(new MapFunction<CommentsWideEntity, String>() {
            @Override
            public String map(CommentsWideEntity commentsWideEntity) throws Exception {
                return JSON.toJSONString(commentsWideEntity, SerializerFeature.DisableCircularReferenceDetect);
            }
        });
        //将关联维度表的数据存入到Kafka
        commentsWideDataStream.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(KafkaUtil.kafkaCons);

                ProducerRecord<String, String> logs = new ProducerRecord<>("dwd_comments", null, value);

                kafkaProducer.send(logs);
            }
        });

        env.execute("");
    }

}
